/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.engine.meta.crawler

import com.chinamobile.cmss.lakehouse.engine.meta.crawler.infer.ColumnsInfer.DirectoryColumnsInfer
import com.chinamobile.cmss.lakehouse.engine.meta.crawler.model._
import org.apache.commons.lang3.StringUtils
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.util.Collector
import org.apache.hadoop.fs.Path

import scala.collection.mutable

case class MapTable(crawlerConfig: CrawlerConfig)
  extends FlatMapFunction[MayBeTable, Table] {

  override def flatMap(mayBeTable: MayBeTable, out: Collector[Table]): Unit = {
    val infer: DirectoryColumnsInfer = DirectoryColumnsInfer(
      crawlerConfig
    )
    mayBeTable match {
      case MayBeTable(tableName, tablePath, Nil) =>
        //table without partition
        val columnsTry = infer.infer(new Path(tablePath))
        columnsTry
          .map(columns =>
            Table(tableName, tablePath, columns, List(), List[PartitionValue]())
          )
          .foreach(out.collect(_))
      case MayBeTable(tableName, tablePath, partitionPaths) =>
        val tryList = partitionPaths.map(p => (p, infer.infer(new Path(p))))
        val cis = tryList.filter(_._2.isSuccess).map((p => (p._1, p._2.get)))
        //[columns,(path,columns)]
        val grouped = cis.groupBy(t => identity(t._2))
        if (grouped.size == 1 && cis.size == tryList.size) {
          val columnList = grouped.head._1
          val (partitions, partitionValues) =
            resolvePartition(tablePath, partitionPaths)
          out.collect(
            Table(tableName, tablePath, columnList, partitions, partitionValues)
          )
        } else {
          grouped.flatMap(_._2).map { t =>
            val p = new Path(t._1)
            val tbName = p.getName
            out.collect(
              Table(tbName, t._1, t._2, List(), List[PartitionValue]())
            )
          }
        }
    }
  }

  private def resolvePartition(
                                tablePath: String,
                                partitionPaths: List[String]
                              ): (List[ColumnInfo], List[PartitionValue]) = {
    import com.chinamobile.cmss.lakehouse.engine.meta.crawler.util.TableUtils._
    var count = 0

    val splitTablePath = StringUtils.split(tablePath, '/')
    val splitPaths = partitionPaths
      .map(path => StringUtils.split(path, '/'))
      .map(p => p.slice(splitTablePath.size, p.size))
    val layeredPaths = splitPaths.transpose
    val partitionBuffer = mutable.ListBuffer[String]()
    var partitionValues = partitionPaths.map(PartitionValue(_, List()))
    for (layer <- layeredPaths) {
      val partitionPairs =
        layer.map(readPartitionValue(_)).filter(_.isDefined).map(_.get)
      val standMode =
        partitionPairs.size == layer.size && partitionPairs
          .map(_._1)
          .distinct
          .size == 1
      val partitionName =
        if (standMode) partitionPairs.head._1
        else {
          val partitionN = "partition" + count
          count = count + 1
          partitionN
        }
      if (standMode) {
        partitionValues = partitionValues.zip(partitionPairs).map {
          case (pv, pair) => pv.copy(values = pv.values ::: List(pair._2))
        }
      } else {
        partitionValues = partitionValues.zip(layer).map {
          case (pv, v) => pv.copy(values = pv.values ::: List(v))
        }
      }
      partitionBuffer.append(partitionName)
    }
    (partitionBuffer.toList.map(ColumnInfo(_, StringType)), partitionValues)
  }
}
